package com.at.checkpoint15;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * 检查点
 *
 * @author huangchao E-mail:fengquan8866@163.com
 * @version 创建时间：2024/9/30 21:51
 */
public class CheckConfigDemo1 {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // TODO 最终检查点： 1.15开始，默认true
//        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(1);

        // TODO 开启 changelog
        // 要求Checkpoint的最大并发必须为1，其他参数建议在flink-conf配置文件中去指定
        env.enableChangelogStateBackend(true);

        // 导入hadoop依赖，指定访问hdfs的用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        // TODO 检查点配置
        // 1、启用检查点
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 2、指定检查点的存储位置
        checkpointConfig.setCheckpointStorage("hdfs://master:8020/flink/checkpoint");
        // 3、checkpoint超时时间: 默认10分钟
        checkpointConfig.setCheckpointTimeout(60*1000);
        // 4、同时运行中的checkpoint的最大数量
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // 5、最小等待时间: 上一轮checkpoint结束 到 下一轮 checkpoint开始 之间的间隔，设置了>0，并发就会变成1
        checkpointConfig.setMinPauseBetweenCheckpoints(1000);
        // 6、取消作业时，checkpoint的数据保留在外部系统 是否保留在外部系统
        // DELETE_ON_CANCELLATION: 主动cancel时，删除存在外部系统的 chk-xx 目录（如果是程序突然挂掉，不会删）
        // RETAIN_ON_CANCELLATION：主动cancel时，外部系统的 chk-xx 目录会保存下来
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 7、允许 checkpoint 连续失败的次数，默认0--》 表示checkpoint一失败，Job就挂掉
        checkpointConfig.setTolerableCheckpointFailureNumber(10);

        // TODO 开启 非对齐检查点 （Barrier 非对齐）
        // 开启之后的要求：Checkpoint模式必须是精准一次，最大并发必须设为1
        checkpointConfig.enableUnalignedCheckpoints();
        // 开启非对齐检查点才生效：默认0，表示一开始就直接用 非对齐检查点
        // 如果>0，一开始用 对齐检查点（Barrier对齐），对齐的时间超过这个参数，自动切换成 非对齐检查点（Barrier非对齐）
        checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));

        env.socketTextStream("localhost", 7777)
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        })
//                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                })
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
    }
}
/**
 * TODO 检查点算法的总结
 * 1、Barrier对齐： 一个 Task 收到 所有上游 同一个编号的 Barrier 之后，才会对自己的本地状态做 备份
 *      精准一次： 在Barrier对齐过程中，Barrier后面的数据 阻塞等待（不会越过Barrier）
 *      至少一次： 在Barrier对齐过程中，先到的Barrier，其后面的数据 不阻塞 接着计算
 *
 *
 * 2、非Barrier对齐： 一个Task 收到 第一个 Barrier 时，就开始 执行备份，能保证 精准一次（flink 1.11出的新算法）
 *      先到的Barrier，将 本地状态 备份，其后面的数据接着计算输出
 *      未到的Barrier，其 前面的数据 接着计算输出，同时 也保存到 备份中
 *      最后一个Barrier到达 该Task时，这个Task的备份结束
 */